package com.veromca.kafka.kafka_demo.producer;

import java.util.Properties;

import kafka.serializer.StringEncoder;
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  

public class KafkaProducer extends Thread {
	private final kafka.javaapi.producer.Producer<String, String> producer; 
	private String topic;
	
	public KafkaProducer(String topic){
		super();
		this.topic = topic;
		Properties props = new Properties();
		props.setProperty("zookeeper.connect", "192.168.1.113:2181");
		props.setProperty("serializer.class", StringEncoder.class.getName());	
		props.put("key.serializer.class", "kafka.serializer.StringEncoder");
		props.put("request.required.acks", "1");
		props.put("metadata.broker.list", "192.168.1.113:9092"); 
		producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props)); 
	}
	
	@Override
	public void run(){
		for (int i = 0; i < 50; i++) {
			 String messageStr = new String("Message_" + i+1);  
	         System.out.println("Send:" + messageStr);  
			producer.send(new KeyedMessage<String, String>(topic, messageStr));
		}
		   producer.close();
	}
	
	public static void main(String[] args) {
		new KafkaProducer("test3").start();
	}
}
